// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#ifndef IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
#define IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H

/// This scanner parses Sequence files and writes the content as tuples in the Impala
/// in-memory representation of data (tuples, rows, row batches).
///
/// TODO: Make the various sequence file formats behave more similarly.  They should
/// all have a structure similar to block compressed operating in batches rather than
/// row at a time.
///
/// org.apache.hadoop.io.SequenceFile is the original SequenceFile implementation
/// and should be viewed as the canonical definition of this format. If
/// anything is unclear in this file you should consult the code in
/// org.apache.hadoop.io.SequenceFile.
///
/// The following is a pseudo-BNF grammar for SequenceFile. Comments are prefixed
/// with dashes:
///
/// seqfile ::=
///   <file-header>
///   <record-block>+
///
/// record-block ::=
///   <record>+
///   <file-sync-hash>
///
/// file-header ::=
///   <file-version-header>
///   <file-key-class-name>
///   <file-value-class-name>
///   <file-is-compressed>
///   <file-is-block-compressed>
///   [<file-compression-codec-class>]
///   <file-header-metadata>
///   <file-sync-field>
///
/// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
///
/// -- The name of the Java class responsible for reading the key buffer
///
/// file-key-class-name ::=
///   Text {"org.apache.hadoop.io.BytesWritable"}
///
/// -- The name of the Java class responsible for reading the value buffer
///
/// -- We don't care what this is.
/// file-value-class-name ::=
///
/// -- Boolean variable indicating whether or not the file uses compression
/// -- for key/values in this file
///
/// file-is-compressed ::= Byte[1]
///
/// -- A boolean field indicating whether or not the file is block compressed.
///
/// file-is-block-compressed ::= Byte[1] {false}
///
/// -- The Java class name of the compression codec iff <file-is-compressed>
/// -- is true. The named class must implement
/// -- org.apache.hadoop.io.compress.CompressionCodec.
/// -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
///
/// file-compression-codec-class ::= Text
///
/// -- A collection of key-value pairs defining metadata values for the
/// -- file. The Map is serialized using standard JDK serialization, i.e.
/// -- an Int corresponding to the number of key-value pairs, followed by
/// -- Text key and value pairs.
///
/// file-header-metadata ::= Map<Text, Text>
///
/// -- A 16 byte marker that is generated by the writer. This marker appears
/// -- at regular intervals at the beginning of records or record blocks
/// -- intended to enable readers to skip to a random part of the file
/// -- the sync hash is preceeded by a length of -1, refered to as the sync marker
///
/// file-sync-hash ::= Byte[16]
///
/// -- Records are all of one type as determined by the compression bits in the header
///
/// record ::=
///   <uncompressed-record>     |
///   <block-compressed-record> |
///   <record-compressed-record>
///
/// uncompressed-record ::=
///   <record-length>
///   <key-length>
///   <key>
///   <value>
///
/// record-compressed-record ::=
///   <record-length>
///   <key-length>
///   <key>
///   <compressed-value>
///
/// block-compressed-record ::=
///   <file-sync-field>
///   <key-lengths-block-size>
///   <key-lengths-block>
///   <keys-block-size>
///   <keys-block>
///   <value-lengths-block-size>
///   <value-lengths-block>
///   <values-block-size>
///   <values-block>
///
/// record-length := Int
/// key-length := Int
/// keys-lengths-block-size> := Int
/// value-lengths-block-size> := Int
///
/// keys-block :: = Byte[keys-block-size]
/// values-block :: = Byte[values-block-size]
///
/// -- The key-lengths and value-lengths blocks are are a sequence of lengths encoded
/// -- in ZeroCompressedInteger (VInt) format.
///
/// key-lengths-block :: = Byte[key-lengths-block-size]
/// value-lengths-block :: = Byte[value-lengths-block-size]
///
/// Byte ::= An eight-bit byte
///
/// VInt ::= Variable length integer. The high-order bit of each byte
/// indicates whether more bytes remain to be read. The low-order seven
/// bits are appended as increasingly more significant bits in the
/// resulting integer value.
///
/// Int ::= A four-byte integer in big-endian format.
///
/// Text ::= VInt, Chars (Length prefixed UTF-8 characters)

#include "exec/base-sequence-scanner.h"

namespace impala {

template <bool>
class DelimitedTextParser;

class HdfsSequenceScanner : public BaseSequenceScanner {
 public:
  /// The four byte SeqFile version header present at the beginning of every
  /// SeqFile file: {'S', 'E', 'Q', 6}
  static const uint8_t SEQFILE_VERSION_HEADER[4];

  HdfsSequenceScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);

  virtual ~HdfsSequenceScanner();

  /// Implementation of HdfsScanner interface.
  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;

  /// Codegen WriteAlignedTuples(). Stores the resulting function in
  /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
  static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
      llvm::Function** write_aligned_tuples_fn);

 protected:
  /// Implementation of sequence container super class methods.
  virtual FileHeader* AllocateFileHeader();
  virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
  virtual Status InitNewRange() WARN_UNUSED_RESULT;
  virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;

  virtual THdfsFileFormat::type file_format() const {
    return THdfsFileFormat::SEQUENCE_FILE;
  }

 private:
  /// Maximum size of a compressed block.  This is used to check for corrupted
  /// block size so we do not read the whole file before we detect the error.
  static const int64_t MAX_BLOCK_SIZE = 1024 * 1024 * 1024;

  /// The value class name located in the SeqFile Header.
  /// This is always "org.apache.hadoop.io.Text"
  static const char* const SEQFILE_VALUE_CLASS_NAME;

  /// Reads the record header and sets 'current_block_length_'.
  Status ReadBlockHeader() WARN_UNUSED_RESULT;

  /// Processes or continues processing a block-compressed scan range, adding tuples
  /// to 'row_batch'. Block-compressed ranges are common and can be parsed more
  /// efficiently in larger pieces.
  Status ProcessBlockCompressedScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;

  /// Reads a compressed block. Does NOT read sync or -1 marker preceding sync.
  /// Decompresses the data into 'unparsed_data_buffer_' allocated from the
  /// 'data_buffer_pool_' via the decompressor.
  /// Sets 'num_buffered_records_in_compressed_block_' if decompression was
  /// successful.
  Status ReadCompressedBlock() WARN_UNUSED_RESULT;

  /// Utility function for parsing 'next_record_in_compressed_block_'. Called by
  /// ProcessBlockCompressedScanRange().
  Status ProcessDecompressedBlock(RowBatch* row_batch) WARN_UNUSED_RESULT;

  /// Read a single record from the current position in 'stream_', decompressing
  /// the record, if necessary. Not used for block compressed files.
  /// Output:
  ///   record_ptr: pointer to the record
  ///   record_len: length of the record
  Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;

  /// Helper class for picking fields and rows from delimited text.
  boost::scoped_ptr<DelimitedTextParser<false>> delimited_text_parser_;
  std::vector<FieldLocation> field_locations_;

  /// Data that is fixed across headers.  This struct is shared between scan ranges.
  struct SeqFileHeader : public BaseSequenceScanner::FileHeader {
    /// If true, the file uses row compression
    bool is_row_compressed;
  };

  /// Struct for record locations and lens in compressed blocks.
  struct RecordLocation {
    uint8_t* record;
    int64_t len;
  };

  /// Records are processed in batches. This vector stores batches of record locations
  /// that are being processed.
  std::vector<RecordLocation> record_locations_;

  /// Length of the current sequence file block (or record).
  int current_block_length_ = -1;

  /// Length of the current key.  This is specified as 4 bytes in the format description.
  int current_key_length_ = -1;

  /// Buffer for data read from the 'stream_' directly or after decompression.
  uint8_t* unparsed_data_buffer_ = nullptr;

  /// End of data buffer used to check out of bound error.
  uint8_t* data_buffer_end_ = nullptr;

  /// Number of buffered records unparsed_data_buffer_ from block compressed data.
  int64_t num_buffered_records_in_compressed_block_ = 0;

  /// Next record from block compressed data.
  int64_t next_record_in_compressed_block_len_ = 0;

  /// Next record from block compressed data.
  uint8_t* next_record_in_compressed_block_ = nullptr;
};

} // namespace impala

#endif // IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
